/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.job.reindex.service.impl;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.unidata.mdm.core.util.JobUtils;
import org.unidata.mdm.data.service.job.AbstractRecordPartitioner;
import org.unidata.mdm.job.reindex.configuration.ReindexJobConfigurationConstants;

/**
 * @author Denis Kostovarov
 */
@JobScope
public class ReindexDataJobDataPartitioner extends AbstractRecordPartitioner {
    /**
     * Logger
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(ReindexDataJobDataPartitioner.class);
    /**
     * If true, record's data will be reindexed
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_REINDEX_RECORDS + "] ?: false}")
    private boolean reindexRecords;
    /**
     * If true, rels will be reindexed
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_REINDEX_RELATIONS + "] ?: false}")
    private boolean reindexRelations;
    /**
     * If true, maching data will be reindexed
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_REINDEX_MATCHING + "] ?: false}")
    private boolean reindexMatching;
    /**
     * If true, classifiers data will be reindexed
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_REINDEX_CLASSIFIERS + "] ?: false}")
    private boolean reindexClassifiers;
    /**
     * If true, id log should be processed instead of normal data tables.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_PROCESS_ID_LOG + "] ?: false}")
    private boolean processIdLog;
    /**
     * Comma separated reindex types.
     */
    @Value("#{jobParameters[" + ReindexJobConfigurationConstants.PARAM_REINDEX_TYPES + "]}")
    private String reindexTypes;
    /**
     * Unidata data source
     */
    @Qualifier("storageDataSource")
    @Autowired
    private DataSource storageDataSource;
    /**
     * Split all records as separate partition.
     * For some reason another way to use chunks with few records per partition.
     * Note, that parameter gridSize not used here.
     *
     * @param gridSize
     * @return
     */
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        if (!reindexRecords && !reindexRelations && !reindexMatching && !reindexClassifiers) {
            LOGGER.info("No data kind specified for reindexing [reindexRecords, reindexRelations, reindexMatching, reindexClassifiers are all false]. Exiting.");
            return Collections.emptyMap();
        }

        if (processIdLog) {
            return collectLoggedIds();
        } else {
            setProcessedTypes(reindexTypes);
            return super.partition(gridSize);
        }
    }


    private Map<String, ExecutionContext> collectLoggedIds() {

        Map<String, String> filtersMap = getFilters(filters);
        String operationIdToFilter = filtersMap.get(ReindexJobConfigurationConstants.OPERATION_ID_FILTER);
        operationIdToFilter = operationIdToFilter == null ? null : "'" + operationIdToFilter + "'";
        int number = 0;
        final String tableName = "reindex_id_log_" + StringUtils.replace(operationIdToFilter, "-", "_");
        final String createTable =
                "create temporary table __result (" +
                "    block_num int not null primary key," +
                "    start_id  uuid," +
                "    start_gsn bigint," +
                "    end_id    uuid," +
                "    end_gsn   bigint," +
                "    name      text" +
                ")";

        final String execBlocks = "do " +
                "$$ " +
                "declare" +
                "    exec_sql text;" +
                "    cur_gsn bigint := 1;" +
                "    cur_block int := 0;" +
                "    block_sz int := " + blockSize + ";" +
                "    table_name text := '" + tableName + "'::text; " +
                "begin" +
                " " +
                "exec_sql := ' with _block as (select id, etalon_id, name, operation_id from ' || table_name || ' where id >= $1 order by id limit $2),'" +
                "         || ' _block_start as ( select id, etalon_id, name, operation_id from _block order by id asc limit 1),'" +
                "         || ' _block_end as ( select id, etalon_id, name, operation_id from _block order by id desc limit 1)'" +
                "         || ' insert into __result (block_num, start_gsn, start_id, end_gsn, end_id, name)'" +
                "         || ' select $3, _block_start.id, _block_start.etalon_id, _block_end.id, _block_end.etalon_id, _block_end.name'" +
                "         || ' from _block_start, _block_end';" +
                " " +
                "    while true loop" +
                "        execute exec_sql using cur_gsn, block_sz, cur_block;" +
                "    " +
                "        cur_gsn := ( select __result.end_gsn from __result where __result.block_num = cur_block);" +
                "        if cur_gsn is null then" +
                "            exit;" +
                "        end if;" +
                "    " +
                "        cur_gsn := cur_gsn + 1;" +
                "        cur_block := cur_block + 1;" +
                "    " +
                "    end loop;" +
                " " +
                "end " +
                "$$;";

        final String selectBlocks = "select block_num, start_id, start_gsn, end_id, end_gsn, name from __result";
        final String dropTable = "drop table __result";

        Map<String, ExecutionContext> result = new HashMap<>();
        try (Connection c = storageDataSource.getConnection();
             Statement s = c.createStatement()) {

            s.executeUpdate(createTable);
            s.execute(execBlocks);

            try (ResultSet rs = s.executeQuery(selectBlocks)) {
                while (rs.next()) {

                    final ExecutionContext value = new ExecutionContext();
                    result.put(JobUtils.partitionName(number), value);

                    Long startGSN = rs.getLong("start_gsn");
                    Long endGSN = rs.getLong("end_gsn");
                    String type = rs.getString("name");

                    value.put(ReindexJobConfigurationConstants.PARAM_START_LSN, startGSN);
                    value.put(ReindexJobConfigurationConstants.PARAM_END_LSN, endGSN);
                    value.putString(ReindexJobConfigurationConstants.PARAM_ENTITY_NAME, type);
                    value.putString(ReindexJobConfigurationConstants.PARAM_PARTITION_ID, "partition" + number);
                    number++;

                    LOGGER.info("Finished {} block of LOGGED ids.", number);
                }
            }

            s.executeUpdate(dropTable);

        } catch (SQLException e) {
            LOGGER.warn("SQLE caught.", e);
            return Collections.emptyMap();
        }

        return result;
    }

    public void setReindexRecords(Boolean reindexRecords) {
        this.reindexRecords = reindexRecords;
    }

    public void setReindexRelations(Boolean reindexRelations) {
        this.reindexRelations = reindexRelations;
    }
}
